package com.fwmagic.flink.projectcase.queryactivitycase.map;

import com.fwmagic.flink.projectcase.queryactivitycase.bean.ActivityBean;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class DataToActivityBeanMapFunction extends RichMapFunction<String, ActivityBean> {

    private Connection connection;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink-test?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
    }

    @Override
    public ActivityBean map(String line) throws Exception {
        //u001,A1,2019-09-0210:10:11,1,北京市
        String[] fields = line.split(",");
        String uid = fields[0];
        String aid = fields[1];
        String time = fields[2];
        int eventType = Integer.parseInt(fields[3]);
        String province = fields[4];

        String sql = "select name from t_activities where id =?";
        PreparedStatement ps = connection.prepareStatement(sql);
        ps.setString(1, aid);
        ResultSet rs = ps.executeQuery();
        String activetyName = null;
        while (rs.next()) {
            activetyName = rs.getString(1);
        }
        ps.close();
        return ActivityBean.of(uid, aid, activetyName, time, eventType, province);
    }

    @Override
    public void close() throws Exception {
        super.close();
        connection.close();
    }
}
